diff --git a/lib/TODO.md b/lib/TODO.md index d6d01e4..85b1303 100644 --- a/lib/TODO.md +++ b/lib/TODO.md @@ -1,13 +1,17 @@ To Do ===== -- **IMPORTANT** dimensional compression and PC_Uncompress probably don't play will together... once serialized, patches don't know their compression anymore, so it's possible to pass a NONE patch up into deserialize and have it treated as a DIMENSIONAL - (?) convert PCBYTES to use PCDIMENSION* instead of holding all values as dupes - (??) convert PCBYTES handling to pass-by-reference instead of pass-by-value -- implement PC_PatchAvg/PC_PatchMin/PC_PatchMax as C functions against patches with dimensional and uncompressed implementations -- update pc_patch_from_patchlist to merge dimensional patchlists directly -- TESTS for pc_patch_dimensional_from_uncompressed() and pc_patch_dimensional_compress() +- implement PC\_PatchAvg/PC\_PatchMin/PC\_PatchMax as C functions against patches with dimensional and uncompressed implementations +- TESTS for pc\_patch\_dimensional\_from\_uncompressed() and pc\_patch\_dimensional\_compress() +- Add in dimensional stats caching to speed up dimensional compression in batch cases + - Add Min/Max values to dimensional compression serialization (???) +- Add Min/Max values to GHT compression serialization (???) + +- Update pc\_patch\_from\_patchlist() to merge GHT patches without decompression +- Update pc\_patch\_from\_patchlist() to merge dimensional patches directly - Before doing dimensional compression, sort by geohash (actually by a localized geohash based on the patch bounds). This will enhance the autocorrelation of values and improve run-length encoding in particular - Add Min/Max values to front of GHT serialization @@ -26,10 +30,10 @@ Use Cases to Support More Functions -------------- -- PC_FilterEquals(patch, dimension, value) returns patch -- PC_FilterLessThan(patch, dimension, value) returns patch -- PC_FilterGreaterThan(patch, dimension, value) returns patch -- PC_FilterBetween(patch, dimension, valuemin, valuemax) returns patch -- PC_FilterPolygon(patch, wkb) returns patch -- PC_Filter(patch, dimension, expression) returns patch -- PC_Get(pcpatch, dimname) returns Array(numeric) +- PC\_FilterEquals(patch, dimension, value) returns patch +- PC\_FilterLessThan(patch, dimension, value) returns patch +- PC\_FilterGreaterThan(patch, dimension, value) returns patch +- PC\_FilterBetween(patch, dimension, valuemin, valuemax) returns patch +- PC\_FilterPolygon(patch, wkb) returns patch +- PC\_Filter(patch, dimension, expression) returns patch +- PC\_Get(pcpatch, dimname) returns Array(numeric) diff --git a/lib/pc_api.h b/lib/pc_api.h index c3eff69..0a69283 100644 --- a/lib/pc_api.h +++ b/lib/pc_api.h @@ -185,11 +185,7 @@ typedef struct uint32_t npoints; /* How many points we have */ double xmin, xmax, ymin, ymax; size_t ghtsize; -#ifdef HAVE_LIBGHT - GhtTreePtr ght; -#else - uint8_t *ght -#endif + uint8_t *ght; } PCPATCH_GHT; diff --git a/lib/pc_api_internal.h b/lib/pc_api_internal.h index c8ab7f4..77ecf31 100644 --- a/lib/pc_api_internal.h +++ b/lib/pc_api_internal.h @@ -168,6 +168,7 @@ int pc_patch_uncompressed_add_point(PCPATCH_UNCOMPRESSED *c, const PCPOINT *p); PCPATCH_GHT* pc_patch_ght_from_uncompressed(const PCPATCH_UNCOMPRESSED *pa); PCPATCH_GHT* pc_patch_ght_from_pointlist(const PCPOINTLIST *pdl); PCPATCH_UNCOMPRESSED* pc_patch_uncompressed_from_ght(const PCPATCH_GHT *pght); +void pc_patch_ght_free(PCPATCH_GHT *paght); diff --git a/lib/pc_patch.c b/lib/pc_patch.c index 973baa6..3f73612 100644 --- a/lib/pc_patch.c +++ b/lib/pc_patch.c @@ -46,7 +46,7 @@ pc_patch_free(PCPATCH *patch) } case PC_GHT: { - pcerror("pc_patch_free: GHT not supported"); + pc_patch_ght_free((PCPATCH_GHT*)patch); break; } case PC_DIMENSIONAL: @@ -97,7 +97,7 @@ pc_patch_compress(const PCPATCH *patch, void *userdata) return (PCPATCH*)patch; } - pcerror("pc_patch_compress: cannot convert patch compressed %d to compressed %d", patch_compression, schema_compression); + pcerror("%s: cannot convert patch compressed %d to compressed %d", __func__, patch_compression, schema_compression); return NULL; } @@ -120,7 +120,7 @@ pc_patch_uncompress(const PCPATCH *patch) if ( patch_compression == PC_GHT ) { - pcerror("pc_patch_uncompress: GHT compression not yet supported"); + pcerror("%s: GHT compression not yet supported", __func__); return NULL; } @@ -142,7 +142,7 @@ pc_patch_from_wkb(const PCSCHEMA *s, uint8_t *wkb, size_t wkbsize) if ( ! wkbsize ) { - pcerror("pc_patch_from_wkb: zero length wkb"); + pcerror("%s: zero length wkb", __func__); } /* @@ -155,7 +155,7 @@ pc_patch_from_wkb(const PCSCHEMA *s, uint8_t *wkb, size_t wkbsize) if ( pcid != s->pcid ) { - pcerror("pc_patch_from_wkb: wkb pcid (%d) not consistent with schema pcid (%d)", pcid, s->pcid); + pcerror("%s: wkb pcid (%d) not consistent with schema pcid (%d)", __func__, pcid, s->pcid); } switch ( compression ) @@ -170,13 +170,13 @@ pc_patch_from_wkb(const PCSCHEMA *s, uint8_t *wkb, size_t wkbsize) } case PC_GHT: { - pcerror("pc_patch_from_wkb: GHT compression not yet supported"); + pcerror("%s: GHT compression not yet supported", __func__); return NULL; } } /* Don't get here */ - pcerror("pc_patch_from_wkb: unknown compression '%d' requested", compression); + pcerror("%s: unknown compression '%d' requested", __func__, compression); return NULL; } @@ -203,11 +203,11 @@ pc_patch_to_wkb(const PCPATCH *patch, size_t *wkbsize) } case PC_GHT: { - pcerror("pc_patch_to_wkb: GHT compression not yet supported"); + pcerror("%s: GHT compression not yet supported", __func__); return NULL; } } - pcerror("pc_patch_to_wkb: unknown compression requested '%d'", patch->schema->compression); + pcerror("%s: unknown compression requested '%d'", __func__, patch->schema->compression); return NULL; } @@ -221,7 +221,7 @@ pc_patch_to_string(const PCPATCH *patch) case PC_DIMENSIONAL: return pc_patch_dimensional_to_string((PCPATCH_DIMENSIONAL*)patch); } - pcerror("pc_patch_to_string: unsupported compression %d requested", patch->type); + pcerror("%s: unsupported compression %d requested", __func__, patch->type); } static uint8_t * @@ -326,7 +326,7 @@ pc_patch_from_patchlist(PCPATCH **palist, int numpatches) { if ( schema->pcid != palist[i]->schema->pcid ) { - pcerror("pc_patch_from_patchlist: inconsistent schemas in input"); + pcerror("%s: inconsistent schemas in input", __func__); return NULL; } totalpoints += palist[i]->npoints; @@ -360,7 +360,11 @@ pc_patch_from_patchlist(PCPATCH **palist, int numpatches) } case PC_GHT: { - pcerror("pc_patch_from_patchlist: GHT compression not yet supported"); + PCPATCH_UNCOMPRESSED *pu = pc_patch_uncompressed_from_ght((const PCPATCH_GHT*)pa); + size_t sz = pu->schema->size * pu->npoints; + memcpy(buf, pu->data, sz); + buf += sz; + pc_patch_uncompressed_free(pu); break; } case PC_NONE: @@ -373,7 +377,7 @@ pc_patch_from_patchlist(PCPATCH **palist, int numpatches) } default: { - pcerror("pc_patch_from_patchlist: unknown compresseion type", pa->type); + pcerror("%s: unknown compresseion type", __func__, pa->type); break; } } diff --git a/lib/pc_patch_ght.c b/lib/pc_patch_ght.c index 0885dbb..9231e76 100644 --- a/lib/pc_patch_ght.c +++ b/lib/pc_patch_ght.c @@ -224,7 +224,7 @@ pc_patch_ght_from_uncompressed(const PCPATCH_UNCOMPRESSED *pa) ght_writer_free(writer); } - ght_tree_free(tree); + ght_tree_free(tree); return paght; #endif } @@ -240,9 +240,11 @@ pc_patch_ght_free(PCPATCH_GHT *paght) assert(paght); assert(paght->schema); - if ( paght->ght ) + /* A readonly tree won't own it's ght buffer, */ + /* so only free a readwrite tree */ + if ( ! paght->readonly ) { - ght_tree_free(paght->ght); + pcfree(paght->ght); } pcfree(paght); @@ -353,10 +355,78 @@ pc_patch_uncompressed_from_ght(const PCPATCH_GHT *paght) } +PCPATCH * +pc_patch_ght_from_wkb(const PCSCHEMA *schema, const uint8_t *wkb, size_t wkbsize) +{ +#ifndef HAVE_LIBGHT + pcerror("%s: libght support is not enabled", __func__); + return NULL; +#else + + /* + byte: endianness (1 = NDR, 0 = XDR) + uint32: pcid (key to POINTCLOUD_SCHEMAS) + uint32: compression (0 = no compression, 1 = dimensional, 2 = GHT) + uint32: npoints + uint32: ghtsize + uint8[]: ghtbuffer + */ + + static size_t hdrsz = 1+4+4+4; /* endian + pcid + compression + npoints */ + PCPATCH_GHT *patch; + uint8_t swap_endian = (wkb[0] != machine_endian()); + uint32_t npoints; + size_t ghtsize; + const uint8_t *buf; + GhtTreePtr tree; + GhtArea area; + + if ( wkb_get_compression(wkb) != PC_GHT ) + { + pcerror("%s: call with wkb that is not GHT compressed", __func__); + return NULL; + } + + npoints = wkb_get_npoints(wkb); + + patch = pcalloc(sizeof(PCPATCH_GHT)); + patch->npoints = npoints; + patch->type = PC_GHT; + patch->schema = schema; + patch->readonly = PC_FALSE; + + /* Start on the GHT */ + buf = wkb+hdrsz; + ghtsize = wkb_get_int32(buf, swap_endian); + buf += sizeof(int32_t); /* Move to start of GHT buffer */ + + /* Copy in the tree buffer */ + patch->ght = pcalloc(ghtsize); + memcpy(patch->ght, buf, ghtsize); + + /* Get a tree */ + tree = ght_tree_from_pc_patch(patch); + if ( ! tree ) return NULL; + + /* Calculate bounds and save */ + if ( GHT_OK != ght_tree_get_extent(tree, &area) ) + return NULL; + + patch->xmin = area.x.min; + patch->xmax = area.x.min; + patch->ymin = area.y.min; + patch->ymax = area.y.min; + + ght_tree_free(tree); + + return (PCPATCH*)patch; +#endif +} + + + #if 0 - - char * pc_patch_ght_to_string(const PCPATCH_GHT *pa) { @@ -438,54 +508,7 @@ pc_patch_ght_to_wkb(const PCPATCH_GHT *patch, size_t *wkbsize) } -PCPATCH * -pc_patch_ght_from_wkb(const PCSCHEMA *schema, const uint8_t *wkb, size_t wkbsize) -{ - /* - byte: endianness (1 = NDR, 0 = XDR) - uint32: pcid (key to POINTCLOUD_SCHEMAS) - uint32: compression (0 = no compression, 1 = dimensional, 2 = GHT) - uint32: npoints - dimensions[]: dims (interpret relative to pcid and compressions) - */ - static size_t hdrsz = 1+4+4+4; /* endian + pcid + compression + npoints */ - PCPATCH_GHT *patch; - uint8_t swap_endian = (wkb[0] != machine_endian()); - uint32_t npoints, ndims; - const uint8_t *buf; - int i; - if ( wkb_get_compression(wkb) != PC_DIMENSIONAL ) - { - pcerror("pc_patch_ght_from_wkb: call with wkb that is not dimensionally compressed"); - return NULL; - } - - npoints = wkb_get_npoints(wkb); - ndims = schema->ndims; - - patch = pcalloc(sizeof(PCPATCH_GHT)); - patch->npoints = npoints; - patch->type = PC_DIMENSIONAL; - patch->schema = schema; - patch->readonly = PC_FALSE; - patch->bytes = pcalloc(ndims*sizeof(PCBYTES)); - - buf = wkb+hdrsz; - for ( i = 0; i < ndims; i++ ) - { - PCBYTES *pcb = &(patch->bytes[i]); - PCDIMENSION *dim = schema->dims[i]; - pc_bytes_deserialize(buf, dim, pcb, PC_FALSE /*readonly*/, swap_endian); - pcb->npoints = npoints; - buf += pc_bytes_serialized_size(pcb); - } - - if ( PC_FAILURE == pc_patch_ght_compute_extent(patch) ) - pcerror("pc_patch_ght_compute_extent failed"); - - return (PCPATCH*)patch; -} #endif diff --git a/pgsql/pc_pgsql.c b/pgsql/pc_pgsql.c index c3f7d07..1bad16b 100644 --- a/pgsql/pc_pgsql.c +++ b/pgsql/pc_pgsql.c @@ -174,11 +174,11 @@ pc_patch_from_hexwkb(const char *hexwkb, size_t hexlen, FunctionCallInfoData *fc size_t wkblen = hexlen/2; pcid = wkb_get_pcid(wkb); if ( ! pcid ) - elog(ERROR, "pc_patch_from_hexwkb: pcid is zero"); + elog(ERROR, "%s: pcid is zero", __func__); schema = pc_schema_from_pcid(pcid, fcinfo); if ( ! schema ) - elog(ERROR, "pc_patch_from_hexwkb: unable to look up schema entry"); + elog(ERROR, "%s: unable to look up schema entry", __func__); patch = pc_patch_from_wkb(schema, wkb, wkblen); pfree(wkb); @@ -226,7 +226,7 @@ pc_schema_from_pcid_uncached(uint32 pcid) if (SPI_OK_CONNECT != SPI_connect ()) { SPI_finish(); - elog(ERROR, "pc_schema_from_pcid: could not connect to SPI manager"); + elog(ERROR, "%s: could not connect to SPI manager", __func__); return NULL; } @@ -237,7 +237,7 @@ pc_schema_from_pcid_uncached(uint32 pcid) if ( err < 0 ) { SPI_finish(); - elog(ERROR, "pc_schema_from_pcid: error (%d) executing query: %s", err, sql); + elog(ERROR, "%s: error (%d) executing query: %s", __func__, err, sql); return NULL; } @@ -457,7 +457,8 @@ pc_patch_serialized_size(const PCPATCH *patch) } case PC_GHT: { - pcerror("pc_patch_serialized_size: GHT format not yet supported"); + PCPATCH_GHT *pg = (PCPATCH_GHT*)patch; + return sizeof(SERIALIZED_PATCH) - 1 + pg->ghtsize; } case PC_DIMENSIONAL: { @@ -465,7 +466,7 @@ pc_patch_serialized_size(const PCPATCH *patch) } default: { - pcerror("pc_patch_serialized_size: unknown compresed %d", patch->type); + pcerror("%s: unknown compresed %d", __func__, patch->type); } } return -1; @@ -507,6 +508,31 @@ pc_patch_dimensional_serialize(const PCPATCH *patch_in) return serpch; } + +static SERIALIZED_PATCH * +pc_patch_ght_serialize(const PCPATCH *patch_in) +{ + size_t serpch_size = pc_patch_serialized_size(patch_in); + SERIALIZED_PATCH *serpch = pcalloc(serpch_size); + const PCPATCH_GHT *patch = (PCPATCH_GHT*)patch_in; + + assert(patch_in); + assert(patch_in->type == PC_GHT); + + /* Copy basics */ + serpch->pcid = patch->schema->pcid; + serpch->npoints = patch->npoints; + serpch->xmin = patch->xmin; + serpch->ymin = patch->ymin; + serpch->xmax = patch->xmax; + serpch->ymax = patch->ymax; + serpch->compression = patch->type; + + memcpy(serpch->data, patch->ght, patch->ghtsize); + SET_VARSIZE(serpch, serpch_size); + return serpch; +} + static SERIALIZED_PATCH * pc_patch_uncompressed_serialize(const PCPATCH *patch_in) { @@ -562,12 +588,12 @@ pc_patch_serialize(const PCPATCH *patch_in, void *userdata) } case PC_GHT: { - pcerror("pc_patch_serialize: GHT compression currently unsupported"); + serpatch = pc_patch_ght_serialize(patch); break; } default: { - pcerror("pc_patch_serialize: unsupported compression type %d", patch->type); + pcerror("%s: unsupported compression type %d", __func__, patch->type); } } @@ -632,6 +658,18 @@ pc_patch_uncompressed_deserialize(const SERIALIZED_PATCH *serpatch, const PCSCHE static PCPATCH * pc_patch_dimensional_deserialize(const SERIALIZED_PATCH *serpatch, const PCSCHEMA *schema) { + // typedef struct + // { + // uint32_t size; + // uint32_t pcid; + // uint32_t compression; + // uint32_t npoints; + // double xmin, xmax, ymin, ymax; + // data: + // pcbytes[ndims]; + // } + // SERIALIZED_PATCH; + PCPATCH_DIMENSIONAL *patch; int i; const uint8_t *buf; @@ -667,6 +705,54 @@ pc_patch_dimensional_deserialize(const SERIALIZED_PATCH *serpatch, const PCSCHEM return (PCPATCH*)patch; } +/* +* We don't do any radical deserialization here. Don't build out the tree, just +* set up pointers to the start of the buffer, so we can build it out later +* if necessary. +*/ +static PCPATCH * +pc_patch_ght_deserialize(const SERIALIZED_PATCH *serpatch, const PCSCHEMA *schema) +{ + // typedef struct + // { + // uint32_t size; + // uint32_t pcid; + // uint32_t compression; + // uint32_t npoints; + // double xmin, xmax, ymin, ymax; + // data: + // uint32_t ghtsize; + // uint8_t ght[]; + // } + // SERIALIZED_PATCH; + + PCPATCH_GHT *patch; + uint32_t ghtsize; + const uint8_t *buf = serpatch->data; + int npoints = serpatch->npoints; + + /* Reference the external data */ + patch = pcalloc(sizeof(PCPATCH_GHT)); + + /* Set up basic info */ + patch->type = serpatch->compression; + patch->schema = schema; + patch->readonly = true; + patch->npoints = npoints; + patch->xmin = serpatch->xmin; + patch->ymin = serpatch->ymin; + patch->xmax = serpatch->xmax; + patch->ymax = serpatch->ymax; + + /* Set up ght buffer */ + memcpy(&ghtsize, buf, 4); + patch->ghtsize = ghtsize; + patch->ght = buf + 4; + + /* That's it */ + return (PCPATCH*)patch; +} + PCPATCH * pc_patch_deserialize(const SERIALIZED_PATCH *serpatch, const PCSCHEMA *schema) { @@ -677,8 +763,8 @@ pc_patch_deserialize(const SERIALIZED_PATCH *serpatch, const PCSCHEMA *schema) case PC_DIMENSIONAL: return pc_patch_dimensional_deserialize(serpatch, schema); case PC_GHT: - pcerror("pc_patch_deserialize: GHT compression currently unsupported"); + return pc_patch_ght_deserialize(serpatch, schema); } - pcerror("pc_patch_deserialize: unsupported compression type"); + pcerror("%s: unsupported compression type", __func__); return NULL; } diff --git a/pgsql/pc_pgsql.h b/pgsql/pc_pgsql.h index 54e7c2b..d275d88 100644 --- a/pgsql/pc_pgsql.h +++ b/pgsql/pc_pgsql.h @@ -58,12 +58,12 @@ SERIALIZED_POINT; */ typedef struct { - uint32_t size; - uint32_t pcid; + uint32_t size; + uint32_t pcid; uint32_t compression; - uint32_t npoints; - double xmin, xmax, ymin, ymax; - uint8_t data[1]; + uint32_t npoints; + double xmin, xmax, ymin, ymax; + uint8_t data[1]; } SERIALIZED_PATCH;